package com.flink.demo;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import scala.Tuple2;

import java.util.Random;

/**
 * For 【Apache Flink 入门教程】4.DataStream API 编程
 * DataStreamApiExample
 *
 * @author jjh
 * @date 2019/6/27
 **/
public class DataStreamApiExample {

    private static class DataSource extends RichParallelSourceFunction<Tuple2<String, Integer>> {

        private volatile boolean running = true;

        @Override
        public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
            Random random = new Random(System.currentTimeMillis());
            while (running) {
                Thread.sleep((getRuntimeContext().getIndexOfThisSubtask()+1) + 1000 + 500);
                String key = "类别" + (char)('A' + random.nextInt(3));
                int value = random.nextInt(10)+1;
                System.out.println(String.format("Emit:\t(%s,%d)",key,value));
                ctx.collect(new Tuple2<>(key,value));
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        DataStreamSource<Tuple2<String, Integer>> ds = env.addSource(new DataSource());

        ds.addSink(new SinkFunction<Tuple2<String, Integer>>() {
            @Override
            public void invoke(Tuple2<String, Integer> value, Context ctx) throws Exception {
                System.out.println(String.format("Get:\t (%s,%d)", value._1, value._2));
            }
        });

        env.execute();

    }

}
